Data Skew

In an ideal Spark application run, when Spark wants to perform a join, for example, join keys would be evenly distributed and each partition that needed processing would be nicely organized. However, real business data is rarely so neat and cooperative. We often end up with less than ideal data organization across the Spark cluster that results in degraded performance due to data skew.

  • Data skew is not an issue with Spark rather it is a data problem. 
  • The cause of the data skew problem is the uneven distribution of the underlying data.Uneven partitioning is sometimes unavoidable in the overall data layout or the nature of the query. 
  • For joins and aggregations Spark needs to co-locate records of a single key in a single partition. Records of a key will always be in a single partition. 
  • Similarly, other key records will be distributed in other partitions. If a single partition becomes very large it will cause data skew, which will be problematic for any query engine if no special handling is done.


Here the A partition is 3 times larger than the other two, and therefore will take approximately 3 times as long to compute. As the next stage of processing cannot begin until all three partitions are evaluated, the overall results from the stage will be delayed.

Dealing With Data Skew 

Data skew problems are more apparent in situations where data needs to be shuffled in an operation such as a join or an aggregation. Shuffle is an operation done by Spark to keep related data (data pertaining to a single key) in a single partition. For this, Spark needs to move data around the cluster. Hence, shuffle is considered the most costly operation.

Common symptoms of data skew are:

    Frozen stages and tasks.
    Low utilization of CPU.
    Out of memory errors.

There are several tricks we can employ to deal with data skew problem in Spark. 

Identifying and Resolving Data Skew
Spark users often observe that most tasks finish within a reasonable amount of time, only to have one task take forever. In all likelihood, this is an indication that your dataset is skewed. This behavior also results in the overall underutilization of the cluster. This is especially a problem when running Spark in the cloud, where over-provisioning of cluster resources is wasteful and costly.

If skew is at the data source level (e.g. a Hive table is partitioned on the _month key and the table has a lot more records for a particular _month), this will cause skewed processing in the stage that is reading from the table. In such a case restructuring, the table with a different partition key(s) helps. However, sometimes it is not feasible as the table might be used by other data pipelines in an enterprise. In such cases, there are several things that we can do to avoid skewed data processing.

Data Broadcast
If we are doing a join operation on a skewed dataset one of the tricks is to increase the spark.sql.autoBroadcastJoinThreshold value so that smaller tables get broadcasted. This should be done to ensure sufficient driver and executor memory.

Data Preprocess 

If there are too many null values in a join or group-by key they would skew the operation. Try to preprocess the null values with some random ids and handle them in the application.

Gabage Collection 

Spark runs on the Java Virtual Machine (JVM). Because Spark can store large amounts of data in memory, it has a major reliance on Java’s memory management and garbage collection (GC). Therefore, garbage collection (GC) can be a major issue that can affect many Spark applications.

Common symptoms of excessive GC in Spark are:

  • Application speed.
  • Executor heartbeat timeout.
  • GC overhead limit exceeded error.

Spark’s memory-centric approach and data-intensive applications make it a more common issue than other Java applications. Thankfully, it’s easy to diagnose if your Spark application is suffering from a GC problem. The Spark UI marks executors in red if they have spent too much time doing GC.

Spark executors are spending a significant amount of CPU cycles performing garbage collection. This can be determined by looking at the “Executors” tab in the Spark application UI. Spark will mark an executor in red if the executor has spent more than 10% of the time in garbage collection than the task time, as you can see in the diagram below

Apache Spark UI

The Spark UI indicates excessive GC in red

Addressing Gabage Collection Issues

Here are some of the basic things we can do to try to address GC issues.

Data Structures

  • If using RDD-based applications, use data structures with fewer objects. For example, use an array instead of a list.

Specialized Data Structures

  • If you are dealing with primitive data types, consider using specialized data structures like Koloboke or fastutil. These structures optimize memory usage for primitive types.

Storing Data Off-Heap

The Spark execution engine and Spark storage can both store data off-heap. You can switch on off-heap storage using the following commands:

  • –conf spark.memory.offHeap.enabled = true
  • –conf spark.memory.offHeap.size = Xgb.

Be careful when using off-heap storage as it does not impact on-heap memory size, i.e. it won’t shrink heap memory. So, to define an overall memory limit, assign a smaller heap size.

Built-in vs. User Defined Functions (UDFs)

  • If you are using Spark SQL, try to use the built-in functions as much as possible, rather than writing new UDFs. Most of the Spark UDFs can work on UnsafeRow and don’t need to convert to wrapper data types. This avoids creating garbage, also it plays well with code generation.

Be Stingy About Object Creation

  • Remember we may be working with billions of rows. If we create even a small temporary object with 100-byte size for each row, it will create 1 billion * 100 bytes of garbage.

Salting

In a SQL join operation, the join key is changed to redistribute data in an even manner so that processing for a partition does not take more time. This technique is called salting. Let’s look at an example to better understand the outcome of salting. In a join or group-by operation, Spark maps a key to a particular partition id by computing a hash code on the key and dividing it by the number of shuffle partitions.

Let’s assume there are two tables with the following schema.

Data schema

Let’s consider a case where a particular key is skewed heavily, e.g. key 1, and we want to join both the tables and do a grouping to get a count. For example:

Image title

Image title

After the shuffle stage induced by the join operation, all the rows with the same key need to be in the same partition. Look at the above diagram. Here, all the rows with key 1 are in Partition 1. Similarly, all the rows with key 2 are in Partition 2. It is quite natural that processing Partition 1 will take more time, as the partition contains more data. Let’s check Spark’s UI for the shuffle stage run time for the above query.

Image title

As we can see, one task took a lot more time than other tasks. With more data it would be even more significant. Also, this might cause application instability in terms of memory usage as one partition would be heavily loaded.

Can we add something to the so that our dataset will be more evenly distributed? Most of users with skew problems use the salting technique. Salting is a technique where we will add random values to the join key of one of the tables. In the other table, we need to replicate the rows to match the random keys. The idea is if the join condition is satisfied by key1 == key1, it should also get satisfied by key1_<salt> = key1_<salt>. The value of salt will help the dataset to be more evenly distributed.

Here is an example of how to do that in our use case. Check the number 20, used while doing a random function and while exploding the dataset. This is the distinct number of divisions we want for our skewed key. This is a very basic example and can be improved to include only keys which are skewed.

Image title

Now let’s check the Spark UI again. As we can see processing time is more even now.

Image title

Note that for smaller data the performance difference won’t be very different. Sometimes the shuffle compress also plays a role in the overall runtime. For skewed data, shuffled data can be compressed heavily due to the repetitive nature of data. Hence the overall disk IO/network transfer is also reduced. We need to run our app without salt and with salt to finalize the approach that best fits our needs.

 

No comments:

Post a Comment